Skip to content

Commit

Permalink
[Bug] deploy flink job on yarn, get state bug fixed. (#3329)
Browse files Browse the repository at this point in the history
* [Improve] resolve maven artifacts improvement

* [Bug] extract programArgs bug fixed

* [Improve] job maven dependency support exclusion

* [Improve] maven dependency check improvement

---------

Co-authored-by: benjobs <[email protected]>
  • Loading branch information
wolfboys and benjobs authored Nov 11, 2023
1 parent f23d515 commit 97cc84c
Show file tree
Hide file tree
Showing 22 changed files with 492 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,18 @@ object PropertiesUtils extends Logger {
}
}
programArgs += value.substring(1, value.length - 1)
case _ => programArgs += v
case _ =>
val regexp = "(.*)='(.*)'$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
val regexp = "(.*)=\"(.*)\"$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
programArgs += v
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.http.client.methods.HttpGet
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.impl.client.HttpClients

import java.io.IOException
import java.net.InetAddress
import java.security.PrivilegedExceptionAction
import java.util
Expand Down Expand Up @@ -257,20 +258,19 @@ object YarnUtils extends Logger {
* url
* @return
*/
@throws[IOException]
def restRequest(url: String): String = {
if (url == null) return null

url match {
case u if u.matches("^http(|s)://.*") =>
Try(request(url)) match {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
logError(s"yarnUtils authRestRequest error, url: $u, detail: $e")
throw new IOException(s"yarnUtils authRestRequest error, url: $u, detail: $e")
} else {
logError(s"yarnUtils restRequest error, url: $u, detail: $e")
throw new IOException(s"yarnUtils restRequest error, url: $u, detail: $e")
}
null
}
case _ =>
Try(request(s"${getRMWebAppURL()}/$url")) match {
Expand All @@ -281,8 +281,7 @@ object YarnUtils extends Logger {
} match {
case Success(v) => v
case Failure(e) =>
logError(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
null
throw new IOException(s"yarnUtils restRequest retry 5 times all failed. detail: $e")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,20 @@ import scala.language.postfixOps
class PropertiesUtilsTestCase {

@Test def testExtractProgramArgs(): Unit = {
val args =
"mysql-sync-database \n--database employees \n--mysql-conf hostname=127.0.0.1 \n--mysql-conf port=3306 \n--mysql-conf username=root \n--mysql-conf password=123456 \n--mysql-conf database-name=employees \n--including-tables 'test|test.*' \n--sink-conf fenodes=127.0.0.1:8030 \n--sink-conf username=root \n--sink-conf password= \n--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \n--sink-conf sink.label-prefix=label\n--table-conf replication_num=1 "
val args = "mysql-sync-table \n" +
"--warehouse hdfs:///paimon \n" +
"--database test_db \n" +
"--table test_table \n" +
"--mysql-conf hostname=localhost \n" +
"--mysql-conf username=root \n" +
"--mysql-conf password=123456 \n" +
"--mysql-conf database-name='employees' \n" +
"--mysql-conf table-name='employees' \n" +
"--catalog-conf metastore=hive \n" +
"--catalog-conf uri=thrift://localhost:9083 \n" +
"--table-conf bucket=1 \n" +
"--table-conf changelog-producer=input \n" +
"--table-conf sink.parallelism=1"
val programArgs = new ArrayBuffer[String]()
programArgs ++= PropertiesUtils.extractArguments(args)
println(programArgs)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.console.base.util.JacksonUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.flink.packer.maven.Artifact;
import org.apache.streampark.flink.packer.maven.MavenArtifact;

import org.apache.commons.lang3.StringUtils;

import lombok.Data;
import lombok.SneakyThrows;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Data
public class MavenDependency {

private Set<MavenPom> pom = Collections.emptySet();

private Set<String> jar = Collections.emptySet();

@SneakyThrows
public static MavenDependency of(String dependency) {
if (StringUtils.isNotBlank(dependency)) {
return JacksonUtils.read(dependency, MavenDependency.class);
}
return new MavenDependency();
}

@Override
public boolean equals(Object that) {
if (this == that) {
return true;
}

if (that == null || getClass() != that.getClass()) {
return false;
}

MavenDependency thatDep = (MavenDependency) that;

if (this.pom.size() != thatDep.pom.size()
|| this.jar.size() != thatDep.jar.size()
|| !this.pom.containsAll(thatDep.pom)) {
return false;
}

File localJar = WebUtils.getAppTempDir();
File localUploads = new File(Workspace.local().APP_UPLOADS());
for (String jarName : jar) {
if (!thatDep.jar.contains(jarName)
|| !FileUtils.equals(new File(localJar, jarName), new File(localUploads, jarName))) {
return false;
}
}

return true;
}

public MavenArtifact toMavenArtifact() {
List<Artifact> mvnArts =
this.pom.stream()
.map(
pom ->
new Artifact(
pom.getGroupId(),
pom.getArtifactId(),
pom.getVersion(),
pom.getClassifier(),
pom.toExclusionString()))
.collect(Collectors.toList());
List<String> extJars =
this.jar.stream()
.map(jar -> Workspace.local().APP_UPLOADS() + "/" + jar)
.collect(Collectors.toList());
return new MavenArtifact(mvnArts, extJars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import lombok.Data;

import java.util.Objects;

@Data
public class MavenExclusion {

private String groupId;

private String artifactId;

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

MavenExclusion that = (MavenExclusion) o;
if (this.groupId == null || that.groupId == null) {
return false;
}
if (this.artifactId == null || that.artifactId == null) {
return false;
}
return this.groupId.equals(that.groupId) && this.artifactId.equals(that.artifactId);
}

@Override
public int hashCode() {
return Objects.hash(groupId, artifactId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.streampark.console.core.bean;

import org.apache.commons.lang3.StringUtils;

import lombok.Data;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

@Data
public class MavenPom {
private String groupId;
private String artifactId;
private String version;
private String classifier;
private Set<MavenExclusion> exclusions = Collections.emptySet();

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

MavenPom that = (MavenPom) o;

boolean basic =
this.groupId.equals(that.groupId)
&& this.artifactId.equals(that.artifactId)
&& this.version.equals(that.version);

boolean classify =
StringUtils.isAllBlank(this.classifier, that.classifier)
|| this.classifier.equals(that.classifier);

if (basic && classify) {
Set<MavenExclusion> thisEx =
this.exclusions == null ? Collections.emptySet() : this.exclusions;
Set<MavenExclusion> thatEx =
that.exclusions == null ? Collections.emptySet() : that.exclusions;
return thisEx.size() == thatEx.size() && thisEx.containsAll(thatEx);
}
return false;
}

public String artifactName() {
if (StringUtils.isBlank(classifier)) {
return String.format("%s-%s.jar", artifactId, version);
}
return String.format("%s-%s-%s.jar", artifactId, version, classifier);
}

@Override
public int hashCode() {
return Objects.hash(groupId, artifactId, version, classifier);
}

@Override
public String toString() {
return groupId + ":" + artifactId + ":" + version + getClassifier();
}

public Set<String> toExclusionString() {
return this.exclusions.stream()
.map(x -> String.format("%s:%s", x.getGroupId(), x.getArtifactId()))
.collect(Collectors.toSet());
}

public String getClassifier() {
return StringUtils.isBlank(classifier) ? "" : ":" + classifier;
}
}
Loading

0 comments on commit 97cc84c

Please sign in to comment.