diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 3098daa5da..35facd89f1 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -322,7 +322,18 @@ object PropertiesUtils extends Logger { } } programArgs += value.substring(1, value.length - 1) - case _ => programArgs += v + case _ => + val regexp = "(.*)='(.*)'$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + val regexp = "(.*)=\"(.*)\"$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + programArgs += v + } + } } } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index 83db1d78c7..d20b7bb6ed 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet import org.apache.http.client.protocol.HttpClientContext import org.apache.http.impl.client.HttpClients +import java.io.IOException import java.net.InetAddress import java.security.PrivilegedExceptionAction import java.util @@ -257,20 +258,19 @@ object YarnUtils extends Logger { * url * @return */ + @throws[IOException] def restRequest(url: String): String = { if (url == null) return null - url match { case u if u.matches("^http(|s)://.*") => Try(request(url)) match { case Success(v) => v case Failure(e) => if (hasYarnHttpKerberosAuth) { - logError(s"yarnUtils authRestRequest error, url: $u, detail: $e") + throw new IOException(s"yarnUtils authRestRequest error, url: $u, detail: $e") } else { - logError(s"yarnUtils restRequest error, url: $u, detail: $e") + throw new IOException(s"yarnUtils restRequest error, url: $u, detail: $e") } - null } case _ => Try(request(s"${getRMWebAppURL()}/$url")) match { @@ -281,8 +281,7 @@ object YarnUtils extends Logger { } match { case Success(v) => v case Failure(e) => - logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e") - null + throw new IOException(s"yarnUtils restRequest retry 5 times all failed. detail: $e") } } } diff --git a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala index 995cfd2ece..827d0f11cc 100644 --- a/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala +++ b/streampark-common/src/test/scala/org/apache/streampark/common/util/PropertiesUtilsTestCase.scala @@ -24,8 +24,20 @@ import scala.language.postfixOps class PropertiesUtilsTestCase { @Test def testExtractProgramArgs(): Unit = { - val args = - "mysql-sync-database \n--database employees \n--mysql-conf hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root \n--mysql-conf password=123456 \n--mysql-conf database-name=employees \n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 \n--sink-conf username=root \n--sink-conf password= \n--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf sink.label-prefix=label\n--table-conf replication_num=1 " + val args = "mysql-sync-table \n" + + "--warehouse hdfs:///paimon \n" + + "--database test_db \n" + + "--table test_table \n" + + "--mysql-conf hostname=localhost \n" + + "--mysql-conf username=root \n" + + "--mysql-conf password=123456 \n" + + "--mysql-conf database-name='employees' \n" + + "--mysql-conf table-name='employees' \n" + + "--catalog-conf metastore=hive \n" + + "--catalog-conf uri=thrift://localhost:9083 \n" + + "--table-conf bucket=1 \n" + + "--table-conf changelog-producer=input \n" + + "--table-conf sink.parallelism=1" val programArgs = new ArrayBuffer[String]() programArgs ++= PropertiesUtils.extractArguments(args) println(programArgs) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java new file mode 100644 index 0000000000..ec6bb14a84 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenDependency.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import org.apache.streampark.common.conf.Workspace; +import org.apache.streampark.common.util.FileUtils; +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.flink.packer.maven.Artifact; +import org.apache.streampark.flink.packer.maven.MavenArtifact; + +import org.apache.commons.lang3.StringUtils; + +import lombok.Data; +import lombok.SneakyThrows; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Data +public class MavenDependency { + + private Set pom = Collections.emptySet(); + + private Set jar = Collections.emptySet(); + + @SneakyThrows + public static MavenDependency of(String dependency) { + if (StringUtils.isNotBlank(dependency)) { + return JacksonUtils.read(dependency, MavenDependency.class); + } + return new MavenDependency(); + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + + if (that == null || getClass() != that.getClass()) { + return false; + } + + MavenDependency thatDep = (MavenDependency) that; + + if (this.pom.size() != thatDep.pom.size() + || this.jar.size() != thatDep.jar.size() + || !this.pom.containsAll(thatDep.pom)) { + return false; + } + + File localJar = WebUtils.getAppTempDir(); + File localUploads = new File(Workspace.local().APP_UPLOADS()); + for (String jarName : jar) { + if (!thatDep.jar.contains(jarName) + || !FileUtils.equals(new File(localJar, jarName), new File(localUploads, jarName))) { + return false; + } + } + + return true; + } + + public MavenArtifact toMavenArtifact() { + List mvnArts = + this.pom.stream() + .map( + pom -> + new Artifact( + pom.getGroupId(), + pom.getArtifactId(), + pom.getVersion(), + pom.getClassifier(), + pom.toExclusionString())) + .collect(Collectors.toList()); + List extJars = + this.jar.stream() + .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar) + .collect(Collectors.toList()); + return new MavenArtifact(mvnArts, extJars); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java new file mode 100644 index 0000000000..9ab025f36a --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenExclusion.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import java.util.Objects; + +@Data +public class MavenExclusion { + + private String groupId; + + private String artifactId; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + MavenExclusion that = (MavenExclusion) o; + if (this.groupId == null || that.groupId == null) { + return false; + } + if (this.artifactId == null || that.artifactId == null) { + return false; + } + return this.groupId.equals(that.groupId) && this.artifactId.equals(that.artifactId); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, artifactId); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java new file mode 100644 index 0000000000..21164061c1 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/MavenPom.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import org.apache.commons.lang3.StringUtils; + +import lombok.Data; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +@Data +public class MavenPom { + private String groupId; + private String artifactId; + private String version; + private String classifier; + private Set exclusions = Collections.emptySet(); + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MavenPom that = (MavenPom) o; + + boolean basic = + this.groupId.equals(that.groupId) + && this.artifactId.equals(that.artifactId) + && this.version.equals(that.version); + + boolean classify = + StringUtils.isAllBlank(this.classifier, that.classifier) + || this.classifier.equals(that.classifier); + + if (basic && classify) { + Set thisEx = + this.exclusions == null ? Collections.emptySet() : this.exclusions; + Set thatEx = + that.exclusions == null ? Collections.emptySet() : that.exclusions; + return thisEx.size() == thatEx.size() && thisEx.containsAll(thatEx); + } + return false; + } + + public String artifactName() { + if (StringUtils.isBlank(classifier)) { + return String.format("%s-%s.jar", artifactId, version); + } + return String.format("%s-%s-%s.jar", artifactId, version, classifier); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, artifactId, version, classifier); + } + + @Override + public String toString() { + return groupId + ":" + artifactId + ":" + version + getClassifier(); + } + + public Set toExclusionString() { + return this.exclusions.stream() + .map(x -> String.format("%s:%s", x.getGroupId(), x.getArtifactId())) + .collect(Collectors.toSet()); + } + + public String getClassifier() { + return StringUtils.isBlank(classifier) ? "" : ":" + classifier; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 5b3aa4db28..d963c54777 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -26,20 +26,17 @@ import org.apache.streampark.common.enums.FlinkK8sRestExposedType; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.FsOperator; -import org.apache.streampark.common.util.FileUtils; -import org.apache.streampark.common.util.Utils; import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.base.util.ObjectUtils; -import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.bean.AppControl; +import org.apache.streampark.console.core.bean.MavenDependency; import org.apache.streampark.console.core.enums.FlinkAppState; import org.apache.streampark.console.core.enums.ReleaseState; import org.apache.streampark.console.core.enums.ResourceFrom; import org.apache.streampark.console.core.metrics.flink.JobsOverview; import org.apache.streampark.console.core.utils.YarnQueueLabelExpression; import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates; -import org.apache.streampark.flink.packer.maven.Artifact; -import org.apache.streampark.flink.packer.maven.DependencyInfo; +import org.apache.streampark.flink.packer.maven.MavenArtifact; import org.apache.commons.lang3.StringUtils; @@ -49,24 +46,19 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.core.type.TypeReference; import lombok.Data; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nonnull; -import java.io.File; import java.io.Serializable; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import static org.apache.streampark.console.core.enums.FlinkAppState.of; @@ -487,13 +479,13 @@ public boolean isStreamParkJob() { @JsonIgnore @SneakyThrows - public Dependency getDependencyObject() { - return Dependency.toDependency(this.dependency); + public MavenDependency getMavenDependency() { + return MavenDependency.of(this.dependency); } @JsonIgnore - public DependencyInfo getDependencyInfo() { - return Application.Dependency.toDependency(getDependency()).toJarPackDeps(); + public MavenArtifact getMavenArtifact() { + return getMavenDependency().toMavenArtifact(); } @JsonIgnore @@ -650,65 +642,6 @@ private boolean needFillYarnQueueLabel(ExecutionMode mode) { return ExecutionMode.YARN_PER_JOB.equals(mode) || ExecutionMode.YARN_APPLICATION.equals(mode); } - @Data - public static class Dependency { - private List pom = Collections.emptyList(); - private List jar = Collections.emptyList(); - - @SneakyThrows - public static Dependency toDependency(String dependency) { - if (Utils.notEmpty(dependency)) { - return JacksonUtils.read(dependency, new TypeReference() {}); - } - return new Dependency(); - } - - public boolean isEmpty() { - return pom.isEmpty() && jar.isEmpty(); - } - - public boolean eq(Dependency other) { - if (other == null) { - return false; - } - if (this.isEmpty() && other.isEmpty()) { - return true; - } - - if (this.pom.size() != other.pom.size() || this.jar.size() != other.jar.size()) { - return false; - } - File localJar = WebUtils.getAppTempDir(); - File localUploads = new File(Workspace.local().APP_UPLOADS()); - HashSet otherJars = new HashSet<>(other.jar); - for (String jarName : jar) { - if (!otherJars.contains(jarName) - || !FileUtils.equals(new File(localJar, jarName), new File(localUploads, jarName))) { - return false; - } - } - return new HashSet<>(pom).containsAll(other.pom); - } - - public DependencyInfo toJarPackDeps() { - List mvnArts = - this.pom.stream() - .map( - pom -> - new Artifact( - pom.getGroupId(), - pom.getArtifactId(), - pom.getVersion(), - pom.getClassifier())) - .collect(Collectors.toList()); - List extJars = - this.jar.stream() - .map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar) - .collect(Collectors.toList()); - return new DependencyInfo(mvnArts, extJars); - } - } - @Override public boolean equals(Object o) { if (this == o) { @@ -724,37 +657,4 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(id); } - - @Data - public static class Pom { - private String groupId; - private String artifactId; - private String version; - private String classifier; - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return this.toString().equals(o.toString()); - } - - @Override - public int hashCode() { - return Objects.hash(groupId, artifactId, version, classifier); - } - - @Override - public String toString() { - return groupId + ":" + artifactId + ":" + version + getClassifier(); - } - - private String getClassifier() { - return StringUtils.isEmpty(classifier) ? "" : ":" + classifier; - } - } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java index 57be322d7f..d944e14f8e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkSql.java @@ -18,6 +18,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.console.core.bean.MavenDependency; import org.apache.streampark.console.core.enums.ChangedType; import com.baomidou.mybatisplus.annotation.IdType; @@ -82,12 +83,10 @@ public ChangedType checkChange(FlinkSql target) { // 1) determine if sql statement has changed boolean sqlDifference = !this.getSql().trim().equals(target.getSql().trim()); // 2) determine if dependency has changed - Application.Dependency thisDependency = - Application.Dependency.toDependency(this.getDependency()); - Application.Dependency targetDependency = - Application.Dependency.toDependency(target.getDependency()); + MavenDependency thisDependency = MavenDependency.of(this.getDependency()); + MavenDependency targetDependency = MavenDependency.of(target.getDependency()); - boolean depDifference = !thisDependency.eq(targetDependency); + boolean depDifference = !thisDependency.equals(targetDependency); if (sqlDifference && depDifference) { return ChangedType.ALL; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 07c66ff05d..33afcf8812 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -50,6 +50,7 @@ import org.apache.streampark.console.core.service.SettingService; import org.apache.streampark.console.core.task.FlinkRESTAPIWatcher; import org.apache.streampark.flink.packer.docker.DockerConf; +import org.apache.streampark.flink.packer.maven.Artifact; import org.apache.streampark.flink.packer.maven.MavenTool; import org.apache.streampark.flink.packer.pipeline.BuildPipeline; import org.apache.streampark.flink.packer.pipeline.BuildResult; @@ -71,6 +72,7 @@ import org.apache.streampark.flink.packer.pipeline.impl.FlinkRemoteBuildPipeline; import org.apache.streampark.flink.packer.pipeline.impl.FlinkYarnApplicationBuildPipeline; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; @@ -86,11 +88,15 @@ import javax.annotation.Nonnull; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -164,7 +170,7 @@ public boolean buildApplication(@Nonnull Application app, ApplicationLog applica pipeline.registerWatcher( new PipeWatcher() { @Override - public void onStart(PipeSnapshot snapshot) { + public void onStart(PipeSnapshot snapshot) throws Exception { AppBuildPipeline buildPipeline = AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId()); saveEntity(buildPipeline); @@ -301,7 +307,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { mainClass, yarnProvidedPath, app.getDevelopmentMode(), - app.getDependencyInfo()); + app.getMavenArtifact()); log.info("Submit params to building pipeline : {}", yarnAppRequest); return FlinkYarnApplicationBuildPipeline.of(yarnAppRequest); case YARN_PER_JOB: @@ -316,7 +322,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), - app.getDependencyInfo()); + app.getMavenArtifact()); log.info("Submit params to building pipeline : {}", buildRequest); return FlinkRemoteBuildPipeline.of(buildRequest); case KUBERNETES_NATIVE_SESSION: @@ -329,7 +335,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), - app.getDependencyInfo(), + app.getMavenArtifact(), app.getClusterId(), app.getK8sNamespace()); log.info("Submit params to building pipeline : {}", k8sSessionBuildRequest); @@ -344,7 +350,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), - app.getDependencyInfo(), + app.getMavenArtifact(), app.getClusterId(), app.getK8sNamespace(), app.getFlinkImage(), @@ -364,7 +370,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { } } - private void prepareJars(Application app) { + private void prepareJars(Application app) throws IOException { File localUploadDIR = new File(Workspace.local().APP_UPLOADS()); if (!localUploadDIR.exists()) { localUploadDIR.mkdirs(); @@ -373,8 +379,8 @@ private void prepareJars(Application app) { FsOperator localFS = FsOperator.lfs(); // 1. copy jar to local upload dir if (app.isFlinkSqlJob() || app.isUploadJob()) { - if (!app.getDependencyObject().getJar().isEmpty()) { - for (String jar : app.getDependencyObject().getJar()) { + if (!app.getMavenDependency().getJar().isEmpty()) { + for (String jar : app.getMavenDependency().getJar()) { File localJar = new File(WebUtils.getAppTempDir(), jar); File localUploadJar = new File(localUploadDIR, jar); if (!localJar.exists() && !localUploadJar.exists()) { @@ -398,21 +404,10 @@ private void prepareJars(Application app) { checkOrElseUploadJar(localFS, localJar, localUploadJar, localUploadDIR); // 2) copy jar to local $app_home/lib - boolean cleanUpload = false; File libJar = new File(app.getLocalAppLib(), app.getJar()); - if (!localFS.exists(app.getLocalAppLib())) { - cleanUpload = true; - } else { - if (libJar.exists()) { - if (!FileUtils.equals(localJar, libJar)) { - cleanUpload = true; - } - } else { - cleanUpload = true; - } - } - - if (cleanUpload) { + if (!localFS.exists(app.getLocalAppLib()) + || !libJar.exists() + || !FileUtils.equals(localJar, libJar)) { localFS.mkCleanDirs(app.getLocalAppLib()); localFS.upload(localUploadJar.getAbsolutePath(), app.getLocalAppLib()); } @@ -421,22 +416,48 @@ private void prepareJars(Application app) { if (app.getExecutionModeEnum() == ExecutionMode.YARN_APPLICATION) { List jars = new ArrayList<>(0); - // 1) user jar + // 1). user jar jars.add(libJar); // 2). jar dependency - app.getDependencyObject() - .getJar() - .forEach(jar -> jars.add(new File(localUploadDIR, jar))); + app.getMavenDependency().getJar().forEach(jar -> jars.add(new File(localUploadDIR, jar))); // 3). pom dependency - if (!app.getDependencyInfo().mavenArts().isEmpty()) { - jars.addAll(MavenTool.resolveArtifactsAsJava(app.getDependencyInfo().mavenArts())); + if (!app.getMavenDependency().getPom().isEmpty()) { + Set artifacts = + app.getMavenDependency().getPom().stream() + .filter(x -> !new File(localUploadDIR, x.artifactName()).exists()) + .map( + pom -> + new Artifact( + pom.getGroupId(), + pom.getArtifactId(), + pom.getVersion(), + pom.getClassifier(), + pom.toExclusionString())) + .collect(Collectors.toSet()); + Set mavenArts = MavenTool.resolveArtifactsAsJava(artifacts); + jars.addAll(mavenArts); + } + + // 4). local uploadDIR to hdfs uploadsDIR + String hdfsUploadDIR = Workspace.remote().APP_UPLOADS(); + for (File jarFile : jars) { + String hdfsUploadPath = hdfsUploadDIR + "/" + jarFile.getName(); + if (!fsOperator.exists(hdfsUploadPath)) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } else { + InputStream inputStream = Files.newInputStream(jarFile.toPath()); + if (!DigestUtils.md5Hex(inputStream).equals(fsOperator.fileMd5(hdfsUploadPath))) { + fsOperator.upload(jarFile.getAbsolutePath(), hdfsUploadDIR); + } + } } + // 5). copy jars to $hdfs_app_home/lib fsOperator.mkCleanDirs(app.getAppLib()); - // 4). upload jars to appLibDIR - jars.forEach(jar -> fsOperator.upload(jar.getAbsolutePath(), app.getAppLib())); + jars.forEach( + jar -> fsOperator.copy(hdfsUploadDIR + "/" + jar.getName(), app.getAppLib())); } } else { String appHome = app.getAppHome(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index e894bd626c..99da52420b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -40,6 +40,7 @@ import org.apache.streampark.console.base.util.CommonUtils; import org.apache.streampark.console.base.util.ObjectUtils; import org.apache.streampark.console.base.util.WebUtils; +import org.apache.streampark.console.core.bean.MavenDependency; import org.apache.streampark.console.core.entity.AppBuildPipeline; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.ApplicationConfig; @@ -852,12 +853,10 @@ public boolean update(Application appParam) { application.setRelease(ReleaseState.NEED_RELEASE.get()); if (application.isUploadJob()) { - Application.Dependency thisDependency = - Application.Dependency.toDependency(appParam.getDependency()); - Application.Dependency targetDependency = - Application.Dependency.toDependency(application.getDependency()); + MavenDependency thisDependency = MavenDependency.of(appParam.getDependency()); + MavenDependency targetDependency = MavenDependency.of(application.getDependency()); - if (!thisDependency.eq(targetDependency)) { + if (!thisDependency.equals(targetDependency)) { application.setDependency(appParam.getDependency()); application.setBuild(true); } else if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index b734a7a033..fab29a74e0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -738,9 +738,6 @@ private CheckPoints httpCheckpoints(Application application) throws Exception { private T yarnRestRequest(String url, Class clazz) throws IOException { String result = YarnUtils.restRequest(url); - if (null == result) { - return null; - } return JacksonUtils.read(result, clazz); } diff --git a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue index 36ab854c59..35c0802696 100644 --- a/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue +++ b/streampark-console/streampark-console-webapp/src/components/Table/src/components/HeaderCell.vue @@ -22,7 +22,7 @@ props: { column: { type: Object as PropType, - default: () => ({}) as BasicColumn, + default: () => ({} as BasicColumn), }, }, setup(props) { diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue index bf6aa9543e..5c9686a456 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/Dependency.vue @@ -33,6 +33,7 @@ import { fetchUpload } from '/@/api/flink/app/app'; import { fetchUploadJars } from '/@/api/flink/app/flinkHistory'; import UploadJobJar from './UploadJobJar.vue'; + import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv'; interface DependencyType { artifactId: string; @@ -89,10 +90,16 @@ Swal.fire('Failed', t('flink.app.dependencyError'), 'error'); return; } - const scalaVersion = props.flinkEnvs.find((v) => v.id === versionId)?.scalaVersion; + + let flinkEnv = props.flinkEnvs || []; + if (props.flinkEnvs?.length == 0) { + flinkEnv = await fetchFlinkEnv(); + } + const scalaVersion = flinkEnv.find((v) => v.id === versionId)?.scalaVersion; if (props.value == null || props.value.trim() === '') { return; } + const groupExp = /([\s\S]*?)<\/groupId>/; const artifactExp = /([\s\S]*?)<\/artifactId>/; const versionExp = /([\s\S]*?)<\/version>/; @@ -125,26 +132,23 @@ mvnPom.classifier = classifier; } const id = getId(mvnPom); - const pomExclusion = {}; + const pomExclusion = []; if (exclusion != null) { const exclusions = exclusion.split(''); exclusions.forEach((e) => { if (e != null && e.length > 0) { const e_group = e.match(groupExp) ? groupExp.exec(e)![1].trim() : null; const e_artifact = e.match(artifactExp) ? artifactExp.exec(e)![1].trim() : null; - const id = e_group + '_' + e_artifact; - pomExclusion[id] = { + pomExclusion.push({ groupId: e_group, artifactId: e_artifact, - }; + }); } }); } mvnPom.exclusions = pomExclusion; dependency.pom[id] = mvnPom; } - } else { - console.error('dependency error...'); } }); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts index 752ad30e0e..2a1a3b9150 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts @@ -597,7 +597,13 @@ export const useCreateAndEditSchema = ( }, ]; }); + onMounted(async () => { + //get flinkEnv + fetchFlinkEnv().then((res) => { + flinkEnvs.value = res; + }); + /* Get project data */ fetchSelect({}).then((res) => { projectList.value = res; @@ -608,10 +614,6 @@ export const useCreateAndEditSchema = ( alerts.value = res; }); - //get flinkEnv - fetchFlinkEnv().then((res) => { - flinkEnvs.value = res; - }); //get flinkCluster fetchFlinkCluster().then((res) => { flinkClusters.value = res; diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts index 87b8ab3978..1e0291fa8e 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/utils/Pom.ts @@ -22,65 +22,40 @@ export function toPomString(pom) { const classifier = pom.classifier; const exclusions = pom.exclusions || []; let exclusionString = ''; - let pomString = ''; + let classifierString = ''; if (exclusions.length > 0) { exclusions.forEach((item) => { exclusionString += - ' \n' + - ' ' + + ' \n' + + ' ' + item.groupId + '\n' + - ' ' + + ' ' + item.artifactId + '\n' + - ' \n'; + ' \n'; }); - pomString = - ' \n' + - ' ' + - groupId + - '\n' + - ' ' + - artifactId + - '\n' + - ' ' + - version + - '\n' + - ' \n' + - exclusionString + - ' \n' + - ' '; - } else { - if (classifier != null) { - pomString = - ' \n' + - ' ' + - groupId + - '\n' + - ' ' + - artifactId + - '\n' + - ' ' + - version + - '\n' + - ' ' + - classifier + - '\n' + - ' '; - } else { - pomString = - ' \n' + - ' ' + - groupId + - '\n' + - ' ' + - artifactId + - '\n' + - ' ' + - version + - '\n' + - ' '; - } + exclusionString = ' \n' + exclusionString + ' \n'; } + + if (classifier != null) { + classifierString = ' ' + classifier + '\n'; + } + + const pomString = + ' \n' + + ' ' + + groupId + + '\n' + + ' ' + + artifactId + + '\n' + + ' ' + + version + + '\n' + + classifierString + + exclusionString + + ' '; + return pomString; } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 58dc7f82bb..6a2e8c95c1 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -207,28 +207,22 @@ trait FlinkClientTrait extends Logger { jobGraphFunc(submitRequest, flinkConfig, jarFile) } match { case Failure(e) => - logWarn( - s"""\n - |[flink-submit] JobGraph Submit Plan failed, error detail: - |------------------------------------------------------------------ - |${Utils.stringifyException(e)} - |------------------------------------------------------------------ - |Now retry submit with RestAPI Plan ... - |""".stripMargin - ) Try(restApiFunc(submitRequest, flinkConfig, jarFile)) match { case Success(r) => r - case Failure(e) => - logError( + case Failure(e1) => + throw new RuntimeException( s"""\n - |[flink-submit] RestAPI Submit failed, error detail: + |[flink-submit] Both JobGraph submit plan and Rest API submit plan all failed! + |JobGraph Submit plan failed detail: |------------------------------------------------------------------ |${Utils.stringifyException(e)} |------------------------------------------------------------------ - |Both JobGraph submit plan and Rest API submit plan all failed! - |""".stripMargin - ) - throw e + | + | RestAPI Submit plan failed detail: + | ------------------------------------------------------------------ + |${Utils.stringifyException(e1)} + |------------------------------------------------------------------ + |""".stripMargin) } case Success(v) => v } @@ -239,18 +233,23 @@ trait FlinkClientTrait extends Logger { submitRequest: SubmitRequest, jarFile: File): (PackagedProgram, JobGraph) = { - val packageProgram = PackagedProgram.newBuilder + val pgkBuilder = PackagedProgram.newBuilder .setJarFile(jarFile) - .setUserClassPaths( - Lists.newArrayList(submitRequest.flinkVersion.flinkLibs: _*) - ) .setEntryPointClassName( flinkConfig.getOptional(ApplicationConfiguration.APPLICATION_MAIN_CLASS).get()) .setSavepointRestoreSettings(submitRequest.savepointRestoreSettings) - .setArguments(flinkConfig - .getOptional(ApplicationConfiguration.APPLICATION_ARGS) - .orElse(Lists.newArrayList()): _*) - .build() + .setArguments( + flinkConfig + .getOptional(ApplicationConfiguration.APPLICATION_ARGS) + .orElse(Lists.newArrayList()): _*) + // userClassPath... + submitRequest.executionMode match { + case ExecutionMode.REMOTE | ExecutionMode.YARN_PER_JOB => + pgkBuilder.setUserClassPaths(submitRequest.flinkVersion.flinkLibs) + case _ => + } + + val packageProgram = pgkBuilder.build() val jobGraph = PackagedProgramUtils.createJobGraph( packageProgram, diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala index cff3c03984..1e8245228a 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/Artifact.scala @@ -19,15 +19,17 @@ package org.apache.streampark.flink.packer.maven import org.eclipse.aether.artifact.{Artifact => AetherArtifact} +import java.util.{Collections, Set => JavaSet} import java.util.regex.Pattern case class Artifact( groupId: String, artifactId: String, version: String, - classifier: String = null) { + classifier: String = null, + extensions: JavaSet[String] = Collections.emptySet()) { - def eq(artifact: AetherArtifact): Boolean = { + def filter(artifact: AetherArtifact): Boolean = { artifact.getGroupId match { case g if g == groupId => artifact.getArtifactId match { @@ -37,7 +39,6 @@ case class Artifact( case _ => false } } - } object Artifact { diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala similarity index 79% rename from streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala rename to streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala index c80e416e68..0b39f53d11 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenArtifact.scala @@ -27,17 +27,17 @@ import scala.collection.JavaConversions._ * @param extJarLibs * collection of jar lib paths, which elements can be a directory or file path. */ -case class DependencyInfo(mavenArts: Set[Artifact] = Set(), extJarLibs: Set[String] = Set()) { +case class MavenArtifact(mavenArts: Set[Artifact] = Set(), extJarLibs: Set[String] = Set()) { def this(mavenArts: JavaList[Artifact], extJarLibs: JavaList[String]) { this(mavenArts.toSet, extJarLibs.toSet) } - def merge(jarLibs: Set[String]): DependencyInfo = - if (jarLibs != null) DependencyInfo(mavenArts, extJarLibs ++ jarLibs) else this.copy() + def merge(jarLibs: Set[String]): MavenArtifact = + if (jarLibs != null) MavenArtifact(mavenArts, extJarLibs ++ jarLibs) else this.copy() } -object DependencyInfo { - def empty: DependencyInfo = new DependencyInfo() +object MavenArtifact { + def empty: MavenArtifact = new MavenArtifact() } diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala index 05562a870e..a56f413218 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala @@ -43,7 +43,6 @@ import javax.annotation.{Nonnull, Nullable} import java.io.File import java.util -import java.util.{HashSet, Set => JavaSet} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -56,8 +55,10 @@ object MavenTool extends Logger { private[this] val excludeArtifact = List( Artifact.of("org.apache.flink:force-shading:*"), + Artifact.of("org.apache.flink:flink-shaded-force-shading:*"), Artifact.of("com.google.code.findbugs:jsr305:*"), - Artifact.of("org.apache.logging.log4j:*:*")) + Artifact.of("org.apache.logging.log4j:*:*") + ) private[this] def getRemoteRepos(): List[RemoteRepository] = { val builder = @@ -145,7 +146,7 @@ object MavenTool extends Logger { } req.setResourceTransformers(transformer.toList) // issue: https://github.com/apache/incubator-streampark/issues/2350 - req.setFilters(List(new ShadeFilter)) + req.setFilters(List(new ShadedFilter)) req.setRelocators(Lists.newArrayList()) req } @@ -159,7 +160,7 @@ object MavenTool extends Logger { /** * Build a fat-jar with custom jar librarties and maven artifacts. * - * @param dependencyInfo + * @param artifact * maven artifacts and jar libraries for building a fat-jar * @param outFatJarPath * output paths of fat-jar, like "/streampark/workspace/233/my-fat.jar" @@ -167,10 +168,10 @@ object MavenTool extends Logger { @throws[Exception] def buildFatJar( @Nullable mainClass: String, - @Nonnull dependencyInfo: DependencyInfo, + @Nonnull artifact: MavenArtifact, @Nonnull outFatJarPath: String): File = { - val jarLibs = dependencyInfo.extJarLibs - val arts = dependencyInfo.mavenArts + val jarLibs = artifact.extJarLibs + val arts = artifact.mavenArts if (jarLibs.isEmpty && arts.isEmpty) { throw new Exception(s"[StreamPark] streampark-packer: empty artifacts.") } @@ -178,8 +179,8 @@ object MavenTool extends Logger { buildFatJar(mainClass, jarLibs ++ artFilePaths, outFatJarPath) } - def resolveArtifactsAsJava(mavenArtifacts: Set[Artifact]): JavaSet[File] = resolveArtifacts( - mavenArtifacts).asJava + def resolveArtifactsAsJava(mavenArtifacts: util.Set[Artifact]): util.Set[File] = resolveArtifacts( + mavenArtifacts.toSet).asJava /** * Resolve the collectoin of artifacts, Artifacts will be download to ConfigConst.MAVEN_LOCAL_DIR @@ -192,37 +193,55 @@ object MavenTool extends Logger { */ @throws[Exception] def resolveArtifacts(mavenArtifacts: Set[Artifact]): Set[File] = { - if (mavenArtifacts == null) Set.empty[File]; - else { - val (repoSystem, session) = getMavenEndpoint() - val artifacts = mavenArtifacts.map( - e => { - new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version) - }) - logInfo(s"start resolving dependencies: ${artifacts.mkString}") - - val remoteRepos = getRemoteRepos() - // read relevant artifact descriptor info - // plz don't simplify the following lambda syntax to maintain the readability of the code. - val resolvedArtifacts = artifacts - .map(artifact => new ArtifactDescriptorRequest(artifact, remoteRepos, null)) - .map(artDescReq => repoSystem.readArtifactDescriptor(session, artDescReq)) - .flatMap(_.getDependencies) - .filter(_.getScope == "compile") - .filter(x => !excludeArtifact.exists(_.eq(x.getArtifact))) - .map(_.getArtifact) - - val mergedArtifacts = artifacts ++ resolvedArtifacts - logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}") - - // download artifacts - val artReqs = - mergedArtifacts.map(artifact => new ArtifactRequest(artifact, remoteRepos, null)) - repoSystem - .resolveArtifacts(session, artReqs) - .map(_.getArtifact.getFile) - .toSet + if (mavenArtifacts == null) { + return Set.empty[File] } + + val (repoSystem, session) = getMavenEndpoint() + val artifacts = mavenArtifacts.map( + e => { + new DefaultArtifact(e.groupId, e.artifactId, e.classifier, "jar", e.version) -> e.extensions + }) + + logInfo(s"start resolving dependencies: ${artifacts.mkString}") + + val remoteRepos = getRemoteRepos() + // read relevant artifact descriptor info + // plz don't simplify the following lambda syntax to maintain the readability of the code. + val dependenciesArtifacts = artifacts + .map(artifact => new ArtifactDescriptorRequest(artifact._1, remoteRepos, null) -> artifact._2) + .map(descReq => repoSystem.readArtifactDescriptor(session, descReq._1) -> descReq._2) + .flatMap( + result => + result._1.getDependencies + .filter( + dep => { + dep.getScope match { + case "compile" if !excludeArtifact.exists(_.filter(dep.getArtifact)) => + val ga = s"${dep.getArtifact.getGroupId}:${dep.getArtifact.getArtifactId}" + val exclusion = result._2.contains(ga) + if (exclusion) { + val art = result._1.getArtifact + val name = s"${art.getGroupId}:${art.getArtifactId}" + logInfo(s"[MavenTool] $name dependencies exclusion $ga") + } + !exclusion + case _ => false + } + }) + .map(_.getArtifact)) + + val mergedArtifacts = artifacts.map(_._1) ++ dependenciesArtifacts + + logInfo(s"resolved dependencies: ${mergedArtifacts.mkString}") + + // download artifacts + val artReqs = + mergedArtifacts.map(artifact => new ArtifactRequest(artifact, remoteRepos, null)) + repoSystem + .resolveArtifacts(session, artReqs) + .map(_.getArtifact.getFile) + .toSet } /** create composite maven endpoint */ @@ -256,13 +275,13 @@ object MavenTool extends Logger { (repoSystem, session) } - class ShadeFilter extends Filter { + private class ShadedFilter extends Filter { override def canFilter(jar: File): Boolean = true override def isFiltered(name: String): Boolean = { if (name.startsWith("META-INF/")) { if (name.endsWith(".SF") || name.endsWith(".DSA") || name.endsWith(".RSA")) { - logInfo(s"shade ignore file: $name") + logInfo(s"shaded ignore file: $name") return true } } @@ -271,5 +290,4 @@ object MavenTool extends Logger { override def finished(): Unit = {} } - } diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala index 5abc55b924..41c56aa6b9 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildRequest.scala @@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{FlinkVersion, Workspace} import org.apache.streampark.common.enums.{DevelopmentMode, ExecutionMode} import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates import org.apache.streampark.flink.packer.docker.DockerConf -import org.apache.streampark.flink.packer.maven.DependencyInfo +import org.apache.streampark.flink.packer.maven.MavenArtifact import scala.collection.mutable.ArrayBuffer @@ -44,17 +44,17 @@ sealed trait FlinkBuildParam extends BuildParam { def flinkVersion: FlinkVersion - def dependencyInfo: DependencyInfo + def dependency: MavenArtifact def customFlinkUserJar: String - lazy val providedLibs: DependencyInfo = { + lazy val providedLibs: MavenArtifact = { val providedLibs = ArrayBuffer(localWorkspace.APP_JARS, localWorkspace.APP_PLUGINS, customFlinkUserJar) if (developmentMode == DevelopmentMode.FLINK_SQL) { providedLibs += s"${localWorkspace.APP_SHIMS}/flink-${flinkVersion.majorVersion}" } - dependencyInfo.merge(providedLibs.toSet) + dependency.merge(providedLibs.toSet) } def getShadedJarPath(rootWorkspace: String): String = { @@ -79,7 +79,7 @@ case class FlinkK8sSessionBuildRequest( executionMode: ExecutionMode, developmentMode: DevelopmentMode, flinkVersion: FlinkVersion, - dependencyInfo: DependencyInfo, + dependency: MavenArtifact, clusterId: String, k8sNamespace: String) extends FlinkK8sBuildParam @@ -92,7 +92,7 @@ case class FlinkK8sApplicationBuildRequest( executionMode: ExecutionMode, developmentMode: DevelopmentMode, flinkVersion: FlinkVersion, - dependencyInfo: DependencyInfo, + dependency: MavenArtifact, clusterId: String, k8sNamespace: String, flinkBaseImage: String, @@ -110,7 +110,7 @@ case class FlinkRemotePerJobBuildRequest( executionMode: ExecutionMode, developmentMode: DevelopmentMode, flinkVersion: FlinkVersion, - dependencyInfo: DependencyInfo) + dependency: MavenArtifact) extends FlinkBuildParam case class FlinkYarnApplicationBuildRequest( @@ -118,5 +118,5 @@ case class FlinkYarnApplicationBuildRequest( mainClass: String, yarnProvidedPath: String, developmentMode: DevelopmentMode, - dependencyInfo: DependencyInfo) + dependencyInfo: MavenArtifact) extends BuildParam diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala index 72d47991ed..060209ff94 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala @@ -91,7 +91,7 @@ class FlinkK8sApplicationBuildPipeline(request: FlinkK8sApplicationBuildRequest) execStep(3) { val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace) val extJarLibs = request.developmentMode match { - case DevelopmentMode.FLINK_SQL => request.dependencyInfo.extJarLibs + case DevelopmentMode.FLINK_SQL => request.dependency.extJarLibs case DevelopmentMode.CUSTOM_CODE => Set[String]() } val shadedJar = diff --git a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala index 7b0803c456..06831510c9 100644 --- a/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala +++ b/streampark-flink/streampark-flink-packer/src/test/scala/org/apache/streampark/flink/packer/MavenToolSpec.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.packer -import org.apache.streampark.flink.packer.maven.{Artifact, DependencyInfo, MavenTool} +import org.apache.streampark.flink.packer.maven.{Artifact, MavenArtifact, MavenTool} import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfterAll @@ -87,7 +87,7 @@ class MavenToolSpec extends AnyWordSpec with BeforeAndAfterAll with Matchers { val fatJarPath = outputDir.concat("fat-3.jar") val fatJar = MavenTool.buildFatJar( null, - DependencyInfo( + MavenArtifact( Set(Artifact.of("org.apache.flink:flink-connector-kafka_2.11:1.13.0")), Set(path("jars/commons-dbutils-1.7.jar"))), fatJarPath)